Stream Analytics Tutorial

Overview

Welcome to the stream analytics tutorial for EpiData. In this tutorial we will perform near real-time stream analytics on sample weather data acquired from a simulated wireless sensor network.

Package and Module Imports

As a first step, we will import packages and modules required for this tutorial. Since EpiData Context (ec) is required to use the application, it is implicitly imported. Sample functions for near real-time analytics are avaialable in EpiData Analytics package. Other packages and modules, such as datetime, pandas and matplotlib, can also be imported at this time.


In [ ]:
#from epidata.context import ec
from epidata.analytics import *

%matplotlib inline
from datetime import datetime, timedelta
import pandas as pd
import time
import pylab as pl
from IPython import display
import json

Stream Analysis

Function Definition

EpiData supports development and deployment of custom algorithms via Jupyter Notebook. Below, we define python functions for substituting extreme outliers and aggregating temperature measurements. These functions can be operated on near real-time and historic data. In this tutorial, we will apply the functions on near real-time data available from Kafka 'measurements' and 'measurements_cleansed' topics


In [ ]:
import pandas as pd
import math, numbers

def substitute_demo(df, meas_names, method="rolling", size=3):
    """
    Substitute missing measurement values within a data frame, using the specified method.
    """

    df["meas_value"].replace(250, np.nan, inplace=True)
    
    for meas_name in meas_names:

        if (method == "rolling"):
            if ((size % 2 == 0) and (size != 0)): size += 1   
            if df.loc[df["meas_name"]==meas_name].size > 0:
                indices = df.loc[df["meas_name"] == meas_name].index[df.loc[df["meas_name"] == meas_name]["meas_value"].apply(
                    lambda x: not isinstance(x, basestring) and (x == None or np.isnan(x)))]
                substitutes = df.loc[df["meas_name"]==meas_name]["meas_value"].rolling( window=size, min_periods=1, center=True).mean()
            
                df["meas_value"].fillna(substitutes, inplace=True)
                df.loc[indices, "meas_flag"] = "substituted"
                df.loc[indices, "meas_method"] = "rolling average"
        else:
            raise ValueError("Unsupported substitution method: ", repr(method))
    
    return df

In [ ]:
import pandas as pd
import numpy as np
import json

def subgroup_statistics(row):
    row['start_time'] = np.min(row["ts"])
    row["stop_time"] = np.max(row["ts"])
    row["meas_summary_name"] = "statistics"
    row["meas_summary_value"] = json.dumps({'count': row["meas_value"].count(), 'mean': row["meas_value"].mean(),
                                            'std': row["meas_value"].std(), 'min': row["meas_value"].min(), 
                                            'max': row["meas_value"].max()})
    row["meas_summary_description"] = "descriptive statistics"
    return row

def meas_statistics_demo(df, meas_names, method="standard"):
    """
    Compute statistics on measurement values within a data frame, using the specified method.
    """
    
    if (method == "standard"):
        df_grouped = df.loc[df["meas_name"].isin(meas_names)].groupby(["company", "site", "station", "sensor"], 
                            as_index=False)
        df_summary = df_grouped.apply(subgroup_statistics).loc[:, ["company", "site", "station", "sensor",
            "start_time", "stop_time", "event", "meas_name", "meas_summary_name", "meas_summary_value", 
            "meas_summary_description"]].drop_duplicates()
    else:
        raise ValueError("Unsupported summary method: ", repr(method))
                
    return df_summary

Transformations and Streams

The analytics algorithms are executed on near real-time data through transformations. A transformation specifies the function, its parameters and destination. The destination can be one of the database tables, namely 'measurements_cleansed' or 'measurements_summary', or another Kafka topic.

Once the transformations are defined, they are initiated via ec.create_stream(transformations, data_source, batch_duration) function call.


In [ ]:
#Stop current near real-time processing
ec.stop_streaming()

In [ ]:
# Define tranformations and steam operations
op1 = ec.create_transformation(substitute_demo, [["Temperature", "Wind_Speed"], "rolling", 3], "measurements_substituted")
ec.create_stream([op1], "measurements")

op2 = ec.create_transformation(identity, [], "measurements_cleansed")
op3 = ec.create_transformation(meas_statistics, [["Temperature", "Wind_Speed"], "standard"], "measurements_summary")
ec.create_stream([op2, op3],"measurements_substituted")

# Start near real-time processing
ec.start_streaming()

Data Ingestion

We can now start data ingestion from simulated wireless sensor network. To do so, you can download and run the sensor_data_with_outliers.py example shown in the image below.

Data Query and Visualization

We query the original and processed data from Kafka queue using Kafka Consumer. The data obtained from the quey is visualized using Bokeh charts.


In [ ]:
from bokeh.io import push_notebook, show, output_notebook
from bokeh.layouts import row, column
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource

from kafka import KafkaConsumer
import json
from pandas.io.json import json_normalize

output_notebook()

In [ ]:
plot1 = figure(plot_width=750, plot_height=200, x_axis_type='datetime', y_range=(30, 300))
plot2 = figure(plot_width=750, plot_height=200, x_axis_type='datetime', y_range=(30, 300))
df_kafka_init = pd.DataFrame(columns = ["ts", "meas_value"])
test_data_1 = ColumnDataSource(data=df_kafka_init.to_dict(orient='list'))
test_data_2 = ColumnDataSource(data=df_kafka_init.to_dict(orient='list'))
meas_name = "Temperature"

plot1.circle("ts", "meas_value", source=test_data_1, legend=meas_name, line_color='orangered', line_width=1.5)
line1 = plot1.line("ts", "meas_value", source=test_data_1, legend=meas_name, line_color='orangered', line_width=1.5)
plot1.legend.location = "top_right"
plot2.circle("ts", "meas_value", source=test_data_2, legend=meas_name, line_color='blue', line_width=1.5)
line2 = plot2.line("ts", "meas_value", source=test_data_2, legend=meas_name, line_color='blue', line_width=1.5)
plot2.legend.location = "top_right"

In [ ]:
consumer = KafkaConsumer()
consumer.subscribe(['measurements', 'measurements_substituted'])
delay = .1

handle = show(column(plot1, plot2), notebook_handle=True)

In [ ]:
for message in consumer:
    topic = message.topic
    measurements = json.loads(message.value)
    df_kafka = json_normalize(measurements)
    df_kafka["meas_value"] = np.nan if "meas_value" not in measurements else measurements["meas_value"]
    df_kafka = df_kafka.loc[df_kafka["meas_name"]==meas_name]    
    df_kafka = df_kafka[["ts", "meas_value"]]
    df_kafka["ts"] = df_kafka["ts"].apply(lambda x: pd.to_datetime(x, unit='ms').tz_localize('UTC').tz_convert('US/Pacific')) 
    
    if (not df_kafka.empty):
        if (topic == 'measurements'):
            test_data_1.stream(df_kafka.to_dict(orient='list'))
        if (topic == 'measurements_substituted'):
            test_data_2.stream(df_kafka.to_dict(orient='list'))
        push_notebook(handle=handle)
        
    time.sleep(delay)

Another way to query and visualize processed data is using ec.query_measurements_cleansed(..) and ec.query_measurements_summary(..) functions. For our example, we specify paramaters that match sample data set, and query the aggregated values using ec.query_measurements_summary(..) function call.


In [ ]:
# QUERY MEASUREMENTS_CLEANSED TABLE

primary_key={"company": "EpiData", "site": "San_Jose", "station":"WSN-1", 
             "sensor": ["Temperature_Probe", "RH_Probe", "Anemometer"]}
start_time = datetime.strptime('8/19/2017 00:00:00', '%m/%d/%Y %H:%M:%S')
stop_time = datetime.strptime('8/20/2017 00:00:00', '%m/%d/%Y %H:%M:%S')
df_cleansed = ec.query_measurements_cleansed(primary_key, start_time, stop_time)
print "Number of records:", df_cleansed.count()

df_cleansed_local = df_cleansed.toPandas()
df_cleansed_local[df_cleansed_local["meas_name"]=="Temperature"].tail(10).sort_values(by="ts",ascending=False)

In [ ]:
# QUERY MEASUREMNTS_SUMMARY TABLE

primary_key={"company": "EpiData", "site": "San_Jose", "station":"WSN-1", "sensor": ["Temperature_Probe"]}
start_time = datetime.strptime('8/19/2017 00:00:00', '%m/%d/%Y %H:%M:%S')
stop_time = datetime.strptime('8/20/2017 00:00:00', '%m/%d/%Y %H:%M:%S')
last_index = -1
summary_result = pd.DataFrame()

df_summary = ec.query_measurements_summary(primary_key, start_time, stop_time)
df_summary_local = df_summary.toPandas()
summary_keys = df_summary_local[["company", "site", "station", "sensor", "start_time", "stop_time", "meas_name", "meas_summary_name"]]
summary_result = df_summary_local["meas_summary_value"].apply(json.loads).apply(pd.Series)
summary_combined = pd.concat([summary_keys, summary_result], axis=1)

summary_combined.tail(5)

Stop Stream Analytics

The transformations can be stopped at any time via ec.stop_streaming() function call


In [ ]:
#Stop current near real-time processing
ec.stop_streaming()

Next Steps

Congratulations, you have successfully perfomed near real-time analytics on sample data aquired by a simulated wireless sensor network. The next step is to explore various capabilities of EpiData by creating your own custom analytics application!